We're reading below standard data science toolkit, including tools for data processing (e.g. dask - "pandas in parallel"), visualization tools (like e.g. plotly, seaborn) and some machine-learning frameworks (scikit-learn, and similar).
import multiprocessing
import sys
import os
from datetime import datetime
import random
module_path = os.path.abspath(os.path.join('../code/'))
if module_path not in sys.path:
sys.path.append(module_path)
import numpy as np
import pandas as pd
import dask.dataframe as dd
import featuretools as ft
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import shap
import seaborn as sns
import plotly.express as px
from pandas_profiling import ProfileReport
from rnormalize import normalize_user_id
from rtools import get_user_plan
from dateutil import parser
This Jupyter notebook follows rules of reproducible resarch, meaning it can be re-run (after preparing the environment, as defined in requirements.txt) - resulting in similar values in the output and exactly same business conclussions. The only requirement is to have the same file/dir structure - notebook should be located in the notebooks directory, above, we should have data in the data directory, and some additional sources in the code directory.
random.seed(42)
devices_filename = '../data/rev-devices.csv'
notifications_filename = '../data/rev-notifications.csv'
users_filename = '../data/rev-users.csv'
transactions_filename = '../data/rev-transactions.csv'
devices = dd.read_csv(devices_filename)
f'File {devices_filename} was successfully read'
notifications = dd.read_csv(notifications_filename)
f'File {notifications_filename} was successfully read'
users = dd.read_csv(users_filename)
f'File {users_filename} was successfully read'
transactions = dd.read_csv(transactions_filename)
f'File {transactions_filename} was successfully read'
The objective of the Data Book is to provide useful summaries of data, which are easily understood by everyone.
table: devices.csv
a table of devices associated with a user
table: users.csv
a table of user data
table: notifications.csv
a table of notifications that a user has received
table: transactions.csv
a table with transactions that a user made
We're checking here what's the size of data and how many columns we got
f'There are {devices.shape[0].compute()} rows in the "devices" table'
f'There are {notifications.shape[0].compute()} rows in the "notifications" table'
f'There are {users.shape[0].compute()} rows in the "users" table'
print(f'''There are {transactions.shape[0].compute()} rows in the "transactions" table.
The transactions table is {int(transactions.shape[0].compute() / users.shape[0].compute())} times bigger
than the number of users.''')
devices.dtypes
This is rather self-explanatory. It tells what type of device user is using (column - brand).
notifications.dtypes
This is an interesting table. We're especially thrilling to investigate what type of messages are sent to the users (column - reason). Probably they're mostly "push" messages.
users.dtypes
Data on the user. We have information on the type of plan, as well as, some extra data about number of marketing messages.
transactions.dtypes
Transactional data. It should be possible to find transaction type (atm/transfer/online), as well as country of the merchant.
users_df_profile = ProfileReport(users.sample(frac=0.25).compute(),
progress_bar=False,
title='Pandas Profiling Report ("Users" dataframe)',
html={'style':{'full_width':True}})
users_df_profile.to_notebook_iframe() # .to_widgets()
users_df_profile.to_file(output_file="out/users_df_profile_p0.25.html") # saving the whole report to an html file
devices_df_profile = ProfileReport(devices.sample(frac=0.25).compute(),
progress_bar=False,
title='Pandas Profiling Report ("Devices" dataframe)',
html={'style':{'full_width':True}})
devices_df_profile.to_notebook_iframe() # .to_widgets()
devices_df_profile.to_file(output_file="out/devices_df_profile_p0.25.html")
notifications_df_profile = ProfileReport(notifications.sample(frac=0.1).compute(),
progress_bar=False,
title='Pandas Profiling Report ("Notifications" dataframe)',
html={'style':{'full_width':True}})
notifications_df_profile.to_notebook_iframe() # .to_widgets()
notifications_df_profile.to_file(output_file="out/notifications_df_profile_p0.10.html")
transactions_pd = transactions.compute() # store df in memory
transactions_df_profile = ProfileReport(transactions_pd,
minimal=True,
pool_size=multiprocessing.cpu_count()-1,
title='Pandas Profiling Report ("Transactions" dataframe)',
html={'style':{'full_width':True}})
transactions_df_profile.to_notebook_iframe() # .to_widgets()
transactions_df_profile.to_file(output_file="out/transactions_df_profile_pALL.html")
users_pd = users.compute()
users_pd['user_id'] = users_pd['user_id'].apply(lambda x: normalize_user_id(x)).astype(int)
devices_pd = devices.compute().reset_index().rename(columns={'index': 'device_id'})
devices_pd['user_id'] = devices_pd['user_id'].apply(lambda x: normalize_user_id(x)).astype(int)
notifications_pd = notifications.compute().reset_index().rename(columns={'index': 'notification_id'})
notifications_pd['user_id'] = notifications_pd['user_id'].apply(lambda x: normalize_user_id(x)).astype(int)
transactions_pd['user_id'] = transactions_pd['user_id'].apply(lambda x: normalize_user_id(x)).astype(int)
transactions_pd.memory_usage(deep=True)
Let's vizualize some data!
notifications_pd.reason.value_counts()
notifications_pd.channel.value_counts()
The 'SMS' channel is used quite rarely, so we're going to omit it in our visualizations.
notifications_plot_data = notifications_pd[notifications_pd.channel != 'SMS'].sample(n=30*1000)
notifications_plot_data['user_plan'] = notifications_pd['user_id'].apply(lambda x: get_user_plan(x, users_pd))
notifications_plot_data['total_notifications'] = 1
agg_notifications_plot_data = notifications_plot_data.groupby(['reason',
'channel',
'status',
'user_plan']).agg({'total_notifications': 'sum'})
fig = px.bar(agg_notifications_plot_data.reset_index(),
x="reason",
y='total_notifications',
color="user_plan",
barmode="stack",
facet_row="channel", facet_col="status")
fig.show()
transactions_pd
transactions_pd.ea_merchant_country.value_counts()
transactions_for_plot = transactions_pd.sample(n=10*1000)
transactions_for_plot['amount_usd'] = transactions_for_plot['amount_usd'].apply(lambda x: np.log(x))
g = sns.catplot(x="direction",
y="amount_usd",
hue="transactions_type",
col="transactions_state",
data=transactions_for_plot, kind="strip",
height=4, aspect=0.5)
print(f'First transaction in the data is from {transactions_pd.created_date.min()}')
print(f' and the last one is from {transactions_pd.created_date.max()}')
max_date = transactions_pd.created_date.max()
max_date_dt = parser.parse(max_date)
max_date_dt
entities = {"users" : (users_pd, "user_id"),
"devices" : (devices_pd, "device_id"), # we really need that index-as-column
# https://github.com/FeatureLabs/featuretools/issues/130
"notifications" : (notifications_pd, "notification_id"),
"transactions": (transactions_pd, 'transaction_id'),
}
relationships = [("users", "user_id", "transactions", "user_id"),
("users", "user_id", "notifications", "user_id"),
("users", "user_id", "devices", "user_id")]
feature_matrix_users, features_defs = ft.dfs(entities=entities,
relationships=relationships,
target_entity="users", # we wish to profile users, so it's target
# n_jobs=multiprocessing.cpu_count()-1,
# I'm getting "zmq.error.ZMQError: Too many open files" - MacOS issue?
verbose=True)
# features_defs
feature_matrix_users
Please provide the business justification and associated visualisations / rationale in choosing your definition of engagement.
Using my domain knowledge and an intuition (as an active Revolut user), I'd like to segment users into 3 segments:
Users, who make transactions in their home country. They're not afraid to use Revolut on their daily basis, even for local ATMs and online purchases.
Users, who only make transactions abroad. Their main incentive is to save money on currency exchange spread. They're mostly people with non-premium subscriptions.
Users, who don't make much tranactions, even when they're abroad. Low activity, low responsivness to marketing.
It's practically impossible (with current state of data) to detect whenever the user goes abroad and doesn't use his/her Revolut actively there. While it's very easy to count number of transactions per country, this doesn't answer the question if the user went abroad (or not). Because of those facts, we're going to propose some other metric. Recently, customer loyalty has been measured by a metric called "RFV". Let's modify it by adding another dimension - response to "push" notifications. Such RFV-R metric should be validated against 3 segments defined above (super-commited, engaged, unengaged) by adjusting importance weights.
References:
Because calculating RFV score is computational expensive, we're going to calculate this metric only at the last timepoint available in the data. Normally, RFV should be computed on a transactional level and updated after every transaction.
f'Profiling {len(users_pd)} users.. it may take some while'
transactions_pd['year-month'] = transactions_pd['created_date'].apply(lambda x: f'{x.year}-{x.month}')
# we're using a groupby approach
recency = transactions_pd.groupby('user_id').agg({'created_date': 'max'}) # recency
# recency
frequency = transactions_pd.groupby(['user_id', 'year-month'])['transaction_id'].count() # frequency
# frequency
volume = transactions_pd.groupby('user_id')['transaction_id'].count() # volume
abs_volume = transactions_pd.groupby('user_id')['amount_usd'].sum()
# volume
# responsivity
last_notification = notifications_pd.groupby('user_id').agg({'created_date': 'max'})
users_enriched_pd = users_pd.copy()
users_enriched_pd['recency'] = users_enriched_pd['user_id'].apply(lambda x: (max_date_dt -
recency.loc[x]['created_date']).days
if x in recency.index else None)
users_enriched_pd['frequency'] = users_enriched_pd['user_id'].apply(lambda x: frequency.loc[x].values.mean()
if x in frequency.index else None)
users_enriched_pd['volume'] = users_enriched_pd['user_id'].apply(lambda x: volume.loc[x]
if x in frequency.index else None)
users_enriched_pd['responsivity'] = users_enriched_pd['user_id'].apply(lambda x: (recency.loc[x]['created_date'] -
last_notification.loc[x]['created_date']).days
if (x in recency.index) and (x in last_notification.index) else None)
users_enriched_pd['responsivity'] = users_enriched_pd['responsivity'].apply(lambda x: None if x < 0 else x)
# it may happen that a notification was sent recently, and the user hasn't done anything
sns.pairplot(users_enriched_pd[['recency', 'frequency', 'volume', 'responsivity', 'plan']].fillna(0), hue='plan')
Using logic from above, we're trying to categorize users using simple heuristics and intuition. In a result, we'd like to fit a curve which would normalize our RFV-R score to such in range (0,1).
# engaged: super-commited, min 3 countries, premium users, metal users
country_stats = transactions_pd.groupby(['user_id', 'ea_merchant_country'])['ea_merchant_country'].count()
# country_stats
users_enriched_pd['pseudo-label'] = users_enriched_pd.apply(lambda x: (x['plan'] in ['PREMIUM',
'PREMIUM_FREE',
'METAL',
'METAL_FREE']) or
(len(country_stats.loc[x['user_id']]) > 4 if x['user_id'] in country_stats.index else False),
axis=1)
# unengaged: less than 3 countries
users_enriched_pd['pseudo-label'].value_counts()
users_enriched_pd['pseudo-label'] = users_enriched_pd['pseudo-label'].astype(int)
y = users_enriched_pd['pseudo-label'].values
rfv_columns = ['recency', 'frequency', 'volume', 'responsivity']
X = users_enriched_pd[rfv_columns].fillna(0).values
y
clf = LogisticRegression(solver='lbfgs').fit(X, y)
clf.score(X, y)
coefficients = pd.DataFrame({"Feature": rfv_columns, "Coefficients": np.transpose(clf.coef_[0])})
coefficients
clf.classes_
users_enriched_pd['rfv_score'] = users_enriched_pd.apply(lambda x:
clf.predict_proba(np.nan_to_num(np.array([x['recency'],
x['frequency'],
x['volume'],
x['responsivity']]).reshape(1, -1)))[0][1],
axis=1)
users_enriched_pd['rfv_segment'] = users_enriched_pd.apply(lambda x:
clf.predict(np.nan_to_num(np.array([x['recency'],
x['frequency'],
x['volume'],
x['responsivity']]).reshape(1, -1)))[0],
axis=1)
users_enriched_pd['rfv_score'].hist()
users_enriched_pd.columns
sns.pairplot(users_enriched_pd[['recency', 'frequency', 'rfv_segment', 'pseudo-label',
'volume', 'responsivity']].fillna(0),
hue='rfv_segment', diag_kind='hist')
# 20% of commited users should be responsibe for 80% of transactions
sorted_users_enriched = users_enriched_pd.sort_values('rfv_score', ascending=False)
top_20 = sorted_users_enriched.head(n=int(0.2 * len(sorted_users_enriched)))
tail_80 = sorted_users_enriched.tail(n=int(0.8 * len(sorted_users_enriched)))
top_20['total_usd'] = top_20['user_id'].apply(lambda x: abs_volume.loc[x] if x in abs_volume.index else None)
tail_80['total_usd'] = tail_80['user_id'].apply(lambda x: abs_volume.loc[x] if x in abs_volume.index else None)
top_amount_usd = top_20['total_usd'].values.sum()
top_no_of_transactions = top_20['volume'].values.sum()
tail_amount_usd = tail_80['total_usd'].fillna(0).values.sum()
tail_no_of_transactions = tail_80['volume'].fillna(0).values.sum()
top_amount_usd / (top_amount_usd+tail_amount_usd)
top_no_of_transactions / (top_no_of_transactions+tail_no_of_transactions)
# TODO: calculate inequalities between segment of engaged users and not-engaged users
users_enriched_pd = users_enriched_pd.drop(['rfv_score', 'pseudo-label'], axis=1)
labels = users_enriched_pd['rfv_segment'].values
users_enriched_pd = users_enriched_pd.drop(['rfv_segment'], axis=1)
# rerun deep-feature-synthesis
entities2 = {"users" : (users_enriched_pd, "user_id"),
"devices" : (devices_pd, "device_id"),
"notifications" : (notifications_pd, "notification_id"),
"transactions": (transactions_pd, 'transaction_id'),}
feature_matrix_users, features_defs = ft.dfs(entities=entities2,
relationships=relationships,
target_entity="users",
verbose=True)
# feature_matrix_users
feature_matrix_users_cat = pd.get_dummies(feature_matrix_users)
# feature_matrix_users_cat
feature_matrix_users_cat_nonulls = feature_matrix_users_cat.fillna(0)
X_train, X_test, y_train, y_test = train_test_split(feature_matrix_users_cat_nonulls, labels,
test_size=0.20,
random_state=42)
rf = RandomForestClassifier(random_state=42)
gdb = GradientBoostingClassifier(random_state=42)
rf.fit(X_train, y_train)
gdb.fit(X_train, y_train) # takes 2-3 minutes to compute
rf_preds = rf.predict(X_test)
gdb_preds = gdb.predict(X_test)
target_names = ['Not-engaged', 'Engaged user']
rf.classes_, target_names
print(classification_report(y_test, rf_preds, target_names=target_names))
print(classification_report(y_test, gdb_preds, target_names=target_names))
explainer = shap.TreeExplainer(rf)
shap_values = explainer.shap_values(X_train, check_additivity=False)
shap.summary_plot(shap_values, feature_matrix_users_cat_nonulls)
explainer = shap.TreeExplainer(gdb)
shap_values = explainer.shap_values(feature_matrix_users_cat_nonulls)
shap.summary_plot(shap_values, feature_matrix_users_cat_nonulls)
We would take one action with one group of customers, and take a different action (or often not action at all) with a control group, and then compare the results. It means selection of participants and assignment of them to "treatment" and control conditions, preferably using a random procedure (another idea, is to use stratified or systematic sampling).
The final action only would be applied to the entire population of customers if it is a success.
The outcomes must be simple to analyze, and the data should be easily interpreted. We must monitor the research situation to ensure that there are no differences between the treatment and control conditions other than the "intervention".
Measurement of selected outcomes for both groups combined together with statistical analysis to determine if the groups differ on those dependent variable measures.
Outcome metric - 'Loss/gain of churn rate'
Intermediate metric - change of RFV score
Comparision methods - One-way ANOVA test